package com.imooc.spark.project.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

/**
 * HBase操作工具类：Java工具类建议采用单例模式封装
 */
public class HBaseUtils {
    HBaseAdmin admin = null;
    Configuration configuration = null;

    private HBaseUtils() {
        configuration = new Configuration();
        //指定Zookeeper地址
        configuration.set("hbase.zookeeper.quorum", "hadoop000:2181");
        //指定HBase存储路径
        configuration.set("hbase.rootdir", "hdfs://hadoop000:8020/hbase");

        try {
            admin = new HBaseAdmin(configuration);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static HBaseUtils instance = null;

    //单利模式
    public static synchronized HBaseUtils getInstance() {
        if (null == instance) {
            instance = new HBaseUtils();
        }
        return instance;
    }

    /**
     * 创建表
     *
     * @param tableName
     * @param columnFamily
     * @throws Exception
     */
    public void create(String tableName, String columnFamily) throws Exception {
        if (admin.tableExists(tableName.getBytes())) {
            System.out.println("table Exists!");
            System.exit(0);
        } else {
            HTableDescriptor tableDesc = new HTableDescriptor(tableName);
            tableDesc.addFamily(new HColumnDescriptor(columnFamily));
            admin.createTable(tableDesc);
            System.out.println("create table success!");
        }
    }

    /**
     * 显示所有数据，通过HtableScan获取已有表的信息
     *
     * @param tableName
     */
    public void scan(String tableName) throws IOException {
        HTable table = getTable(tableName);
        Scan scan = new Scan();
        ResultScanner scanner = table.getScanner(scan);
        for (Result result : scanner) {
            System.out.println("Scan：" + result);
        }
    }

    /**
     * 根据表名获取到HTable实例
     *
     * @param tableName
     * @return
     */
    public HTable getTable(String tableName) {
        HTable table = null;
        try {
            table = new HTable(configuration, tableName);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return table;
    }

    /**
     * 添加一条记录到HBase表
     *
     * @param tableName HBase表名
     * @param rowkey    HBase表的rowkey
     * @param cf        HBase表的columnfamily
     * @param column    HBase表的列
     * @param value     写入HBase表的值
     */
    public void put(String tableName, String rowkey, String cf, String column, String value) {
        HTable table = getTable(tableName); //获取表
        Put put = new Put(Bytes.toBytes(rowkey)); //传入字节数组--所在行
        //向指定行的  列族：列限定符 添加数据   columnFamily:columnQualifier
        put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
        try {
            table.put(put);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 根据table的rowkey获取对应行的记录
     *
     * @param tableName
     * @param rowkey
     * @return
     */
    public Result get(String tableName, String rowkey) {
        HTable table = getTable(tableName); //获取表
        Get g = new Get(Bytes.toBytes(rowkey));
        try {
            return table.get(g);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }


    public static void main(String[] args) {
//        HTable table = HBaseUtils.getInstance().getTable("imooc_course_clickcount");
//        System.out.println(table.getName().getNameAsString());
        String tableName = "imooc_course_clickcount";
        String rowkey = "20171111_88";  //行
        String cf = "info";             //columnFamily列族
        String column = "click_cout";   //column qualifier 列限定符
        String value = "2";
        HBaseUtils.getInstance().put(tableName, rowkey, cf, column, value);
    }
}
